Receiving Data in Transit¶ Data in transit, also known as data in flight refers to data that is in the process of being moved and therefore not permanently stored in a location where they can be in a static state. Streaming data, messages in a queue or a topic in a messaging system, and requests sent to a HTTP listening port are a few examples. The sources from which data in transit/flight are received can be classified into two categories as follows: Data publishers: You can receive data from these sources without subscribing to receive it. (e.g., HTTP, HTTPS, TCP, email, etc.) Messaging Systems: You need to subscribe to receive data from the source. (e.g., messaging systems such as Kafka, JMS, MQTT, etc.) Receiving data from data publishers¶ Data publishers are transports from which SI can receive messages without subscribing for them. In a typical scenario, you are required to open a port in the WSO2 Integrator: SI that is dedicated to listen to messages from the data publisher. To receive data from a data publisher, define an input stream and connect a source annotation of a type that receives data from a data publisher as shown in the example below. @source(type='http', receiver.url='http://localhost:5005/StudentRegistrationEP', @map(type = 'json')) define stream StudentRegistrationStream (name string, course string); In this example, an online student registration results in an HTTP request in JSON format being sent to the endpoint named StudentRegistrationEP to the 5005 port of the localhost. The source generates an event in the StudentRegistrationStream stream for each of these requests. Try it out¶ To try out the example given above, let's include the source configuration in a Siddhi application and simulate an event to it. Open a new file and add the following Siddhi application to it. @App:name('StudentRegistrationApp') @source(type = 'http', receiver.url = "http://localhost:5005/StudentRegistrationEP", @map(type = 'json')) define stream StudentRegistrationStream (name string, course string); @sink(type = 'log', prefix = "New Student", @map(type = 'passThrough')) define stream StudentLogStream (name string, course string, total long); @info(name = 'TotalStudentsQuery') from StudentRegistrationStream select name, course, count() as total insert into StudentLogStream; Save the Siddhi application. This Siddhi application contains the http source of the previously used example. The TotalStudentsQuery query selects all the student registrations captured as HTTP requests and directs them to the StudentLogStream output stream. A log sink connected to this output stream logs these registrations in the terminal. Before logging the events the Siddhi application also counts the number of registrations via the count() function. This count is presented as total in the logs. Start the Siddhi application by clicking on the play icon in the top panel. To simulate an event, issue the following two CURL commands. curl -X POST \ http://localhost:5005/StudentRegistrationEP \ -H 'content-type: application/json' \ -d '{ "event": { "name": "John Doe", "course": "Graphic Design" } }' curl -X POST \ http://localhost:5005/StudentRegistrationEP \ -H 'content-type: application/json' \ -d '{ "event": { "name": "Michelle Cole", "course": "Graphic Design" } }' The following is logged in the terminal. ```text INFO {io.siddhi.core.stream.output.sink.LogSink} - New Student : Event{timestamp=1603185021250, data=[John Doe, Graphic Design, 1], isExpired=false} INFO {io.siddhi.core.stream.output.sink.LogSink} - New Student : Event{timestamp=1603185486763, data=[Michelle Cole, Graphic Design, 2], isExpired=false} ``` Supported transports¶ The following are the supported transports to capture data in transit from data publishers. Transport Siddhi Extension HTTP http TCP tcp Email email grpc grpc wso2event wso2event Supported mappers¶ Mappers determine the format in which the event is received. For information about transforming events by changing the format in which the data is received/published, see Transforming Data. The following are the supported mappers when you receive data from data publishers. Mapper Supporting Siddhi Extension json json xml xml text text avro avro binary binary Receiving data from messaging systems¶ This section explains how to receive input data from messaging systems where WSO2 Integrator: SI needs to subscribe to specific queues/topics in order to receive the required data. To receive data from a messaging system, define an input stream and connect a source annotation of a type that receives data from a messaging system. For example, consider a weather broadcasting application that publishes the temperature and humidity for each region it monitors in a separate Kafka topic. The local weather broadcasting firm of Houston wants to subscribe to receive weather broadcasts for Houston. @source(type='kafka', topic.list='houston', threading.option='single.thread', group.id='group1', bootstrap.servers='localhost:9092', @map(type='json')) define stream TemperatureHumidityStream (temperature int, humidity int); The above Kafka source listens at bootstrap server localhost:9092 for messages in the kafka topic named houston sent in JSON format. For each message, it generates an event in the TemperatureHumidityStream stream. Try it out¶ To try the above example, follow the steps below. Download the Kafka broker from the Apache site and extract it. This directory is referred to as <KAFKA_HOME> from here on. Start Kafka as follows: First, start a zoo keeper node. To do this, navigate to the <KAFKA_HOME> directory and issue the following command. sh bin/zookeeper-server-start.sh config/zookeeper.properties Next, start a Kafka server node. To do this, issue the following command from the same directory. sh bin/kafka-server-start.sh config/server.properties To create a Kafka topic named houston, issue the following command from the same directory. bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic houston Prepare the editor (with WSO2 Integrator: SI installed) to consume Kafka messages as follows: Open a new file and add the following Siddhi application to it. @App:name('TemperatureReportingApp') @source(type = 'kafka', topic.list = "houston", threading.option = "single.thread", group.id = "group1", bootstrap.servers = "localhost:9092", @map(type = 'json')) define stream TemperatureHumidityStream (temperature int, humidity int); @sink(type = 'log', prefix = "Temperature Update", @map(type = 'passThrough')) define stream OutputStream (temperature int, humidity int); @info(name = 'query1') from TemperatureHumidityStream select * insert into OutputStream; This Siddhi application includes the Kafka source that subscribes to the houston Kafka topic and generates an event in the TemperatureHumidityStream stream for each message in the topic (as described in the example in the previous section). query1 query gets all these messages from the TemperatureHumidityStream stream and inserts them into the OutputStream stream so that they can be logged via the log sink connected to the latter. Save the Siddhi application. Start the TemperatureReportingApp Siddhi application that you created and saved. To generate a message in the houston Kafka topic, follow the steps below: To run the Kafka command line client, issue the following command from the <KAFKA_HOME> directory. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic houston When you are prompted to type messages in the console. Type the following in the command prompt. {"event":{ "temperature":23, "humidity":99}} This pushes a message to the Kafka Server. Then, the Siddhi application you deployed in the WSO2 Integrator: SI consumes this message. Check the logs of the editor (with WSO2 Integrator: SI installed). The Kafka message you generated is logged as follows: INFO {io.siddhi.core.stream.output.sink.LogSink} - Temperature Update : Event{timestamp=1603339705244, data=[23, 99], isExpired=false} Supported transports¶ The following are the supported transports to capture data in transit from messaging systems. Transport Siddhi Extension NATS nats Kafka kafka googlepubsub googlepubsub RabbitMQ rabbitmq JMS JMS MQTT MQTT SQS sqs Supported mappers¶ Mappers determine the format in which the event is received. For information about transforming events by changing the format in which the data is received/published, see Transforming Data. The following are the supported mappers when you receive data from messaging systems. Mapper Supporting Siddhi Extension json json xml xml text text avro avro binary binary protobuf protobuf